使用Python和InfluxDB v2.0入门

导航至

(更新InfluxDB 3.0已放弃Flux和内置任务引擎。用户可以使用基于Python的Quix等外部工具在InfluxDB 3.0中创建任务。)

拥有200多个插件,Telegraf具有广泛的数据收集应用。然而,有时您需要收集自定义数据或可能希望将外部工具集成到您的时序数据分析中。在这种情况下,利用InfluxDB的客户端库是有意义的。今天我们将重点介绍如何使用最新的InfluxDB Python客户端库与InfluxDB v2.0。如果您正在运行InfluxDB v1.x,请查看此教程

InfluxDB Python客户端库

自v1.x以来,InfluxDB Python客户端经历了一些相当大的改进。它更快——快得多——并且更容易使用。它支持多进程,并允许您将查询结果作为Pandas DataFrame返回。WriteAPI支持同步、异步和批处理写入到InfluxDB v2.0。WriteAPI还支持4种不同的写入选项。QueryAPI也支持多个查询选项,并具有中断查询流的能力。

要求

本教程是在MacOS系统上执行的,其中Python 3是通过Homebrew安装的,Python 3.6和Python 3.7分别通过Conda安装。需要Python 3.7来执行多进程。我建议设置额外的工具,如virtualenvpyenvconda-env,以简化Python和客户端的安装。否则,完整的依赖项可以在这里找到。

安装

要安装InfluxDB Python客户端库,只需运行

pip install influxdb-client

如果您已经安装了客户端,可以使用以下命令升级它

pip3 install --upgrade influxdb-client

收集认证参数

为了使用客户端,您需要收集以下参数

  • 桶名称或ID

按照以下文档操作以 创建存储桶。要 查看您的存储桶,可以使用用户界面或执行以下操作

influx -t <your-token> -o <your-org> bucket find
  • 令牌

按照以下文档操作以 创建令牌。要 查看您的令牌,可以使用用户界面或执行以下操作

influx auth find
  • 组织

查看您的组织,可以使用用户界面或执行以下操作

influx org find

导入和连接

导入客户端

from influxdb_client import InfluxDBClient

建立连接

client = InfluxDBClient(url="https://127.0.0.1:9999", token=token, org=org)

写入

在本教程中,我们将探讨所有可以写入与Coyote Creek水位相关的数据的方法。我们的模式将如下所示

存储桶: “my-bucket” 测量: “h2o_feet” 标签键: “location” 标签值: “coyote_creek” 字段键: “water_level” 字段值: 1

首先,实例化WriteAPI

write_api = client.write_api()

WriteAPI还支持4种不同的写入选项:行协议字符串、行协议字节、数据点结构和字典样式。

写入选项一 - 行协议字符串

行协议是Influx的摄取格式。当您已经有转换成行协议的数据时(例如txt文件),它非常有用。

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1"])

写入选项二 - 行协议字节

这是已经以UTF-8编码的字节数组的行协议。客户端将行协议字符串内部转换为这种格式。

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1".encode()])

写入选项三 - 数据点结构

当在客户端代码中构建行协议时,此结构非常有用。Point()类确保您的数据被正确序列化成行协议。

write_api.write("my-bucket", "my-org", Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 1).time(1))

写入选项四 - 字典样式

这是表示数据与Python最兼容的方式。字典在内部转换为Point()结构。

write_api.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1}, "time": 1}])

现在我们知道了所有可以写入数据库的方法,让我们来探讨如何执行这些写入。我们有几个调整选项可用。我们可以指定同步和异步写入。我们还可以调整写入的批处理。

指定写入选项:同步写入

首先,您需要使用以下方式导入您的写入选项方法

from influxdb_client.client.write_api import SYNCHRONOUS

要指定同步写入,只需将选项作为参数实例化WriteAPI。

write_api = client.write_api(write_options=SYNCHRONOUS)

指定写入选项:异步写入

首先,您需要使用以下方式导入您的写入选项方法

from influxdb_client.client.write_api import ASYNCHRONOUS

要指定异步写入,只需将选项作为参数实例化WriteAPI。

write_api = client.write_api(write_options=ASYNCHRONOUS)

指定写入选项:批处理

默认的WriteApi实例使用批处理。您可以指定以下批处理参数

  • batch_size:在批次中收集的数据点的数量
  • flush_interval:在将批次写入之前前的毫秒数
  • jitter_interval:增加批次刷新间隔的随机量毫秒数
  • retry_interval:重试失败的写入的毫秒数。当InfluxDB服务器未指定"Retry-After"头时使用重试间隔

请参阅词汇表了解这些参数的更多功能。

批处理选项示例

write_api = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000))

查询数据库

在查询Influx方面,您有两个选项。您可以使用表格结构或流查询。您还可以在返回所需数据后中断流。但是,为了执行查询,我们首先必须实例化QueryAPI。

首先,实例化QueryAPI

query_api = client.query_api()

生成 Flux 查询

您必须使用 Flux 查询您的数据。如果您从未编写过 Flux 查询,我建议您探索以下资源

针对我们刚刚编写的数据点的 Flux 查询看起来像

query = ' from(bucket:"my-bucket")\
|> range(start: -10m)\
|> filter(fn:(r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r.location == "coyote_creek")\
|> filter(fn:(r) => r._field == "water_level" )'

然后我们可以使用表格结构来返回我们的数据

查询选项一 - 表格结构

result = client.query_api().query(org=my-org, query=query)

results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))

print(results)
[(Water_level, 1)]

Flux 对象有以下方法用于访问您的数据

  • .get_measurement()
  • .get_field()
  • .values.get("<your tags>")
  • .get_time()
  • .get_start()
  • .get_stop()
  • .get_measurement()

查询选项二 - 流

records = client.query_api.query_stream(org= "my-org", query=query)

for record in records:
    print(f'Temperature in {record["location"]} is {record["_value"]}')

Temperature in coyote_creek is 1.

使用 Python 客户端写入和查询 InfluxDB 点的脚本

现在我们了解了所有选项,让我们看看一个完整的脚本可能的样子,用于使用 Python 客户端将点和查询写入 InfluxDB。您可以在此处找到包含此脚本的存储库 这里

from influxdb_client import InfluxDBClient

org = "my-org"
bucket = "my-bucket"
token = $my-token
query = 'from(bucket: "my-bucket")\
|> range(start: -10m)\
|> filter(fn: (r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r._field == "water_level")\
|> filter(fn: (r) => r.location == "coyote_creek")'

#establish a connection
client = InfluxDBClient(url="https://127.0.0.1:9999", token=token, org=org)

#instantiate the WriteAPI and QueryAPI
write_api = client.write_api()
query_api = client.query_api()

#create and write the point
p = Point("h2o_level").tag("location", "coyote_creek").field("water_level", 1)
write_api.write(bucket=bucket,org=org,record=p)
#return the table and print the result
result = client.query_api().query(org=org, query=query)
results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))
print(results)

我希望这个教程能帮助您开始使用 Influx。像往常一样,如果您遇到困难,请在我们 社区网站 Slack 频道分享。我们很乐意获取您的反馈并帮助您解决遇到的问题。